<?php

namespace app\common\command;

use think\Config;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\Log;
use think\Queue;
use Workerman\Timer;
use Workerman\Worker;

use app\common\library\device\cabinet\Analysis;
use app\common\library\device\cabinet\Logic;

class WorkermanMqttServer extends Command
{
    // 服务状态
    protected $status = [
        'start_time' => null,
        'last_message_time' => null,
        'message_count' => 0,
        'error_count' => 0
    ];



    protected function configure()
    {
        $this->setName('shanyu_workman_mqtt')
            ->addArgument('action', Argument::OPTIONAL, "action  start|stop|restart|status|reload")
            ->addOption('daemon', null, Option::VALUE_OPTIONAL, '是否守护进程启动', '-d')
            ->setDescription('山屿台共享球杆柜小程序Mqtt订阅_workman');
    }

    protected function execute(Input $input, Output $output)
    {
        $logConfig=[
            // 日志记录方式，内置 file socket 支持扩展
            'type'  => 'File',
            // 日志保存目录
            'path'  => ROOT_PATH.'log'.DS.'mqtt'.DS,
            // 日志记录级别
            'level' => ['error', 'log', 'debug', 'notice', 'sql'],
        ];
        Log::init($logConfig);

        $this->status['start_time'] = date('Y-m-d H:i:s');
        $output->writeln("启动workerman mqtt服务中 " . $this->status['start_time'] . PHP_EOL);

        global $argv;
        if (!$input->hasArgument('action')) {
            $output->writeln("启动命令不完整，正确启动命令:php think yoga_workman_mqtt start[start|stop|restart|status|reload] --daemon，不加--daemon非守护进程运行。");
            return;
        }

        $action = trim($input->getArgument('action'));
        $allowCommand = ['start', 'stop', 'status', 'reload', 'restart'];
        if (!in_array($action, $allowCommand)) {
            $output->writeln("启动命令无效，正确启动命令:php think yoga_workman_mqtt start[start|stop|restart|status|reload] --daemon，不加--daemon非守护进程运行。");
            return;
        }
        $argv[0] = $argv[1];
        $argv[1] = $action;
        if ($input->hasOption('daemon')) {
            $daemon = $input->getOption('daemon');
            if ($daemon != '-d') {
                $daemon = '-d';
            }
            $argv[2] = $daemon;
        }

        if (isset($argv[3])) {
            unset($argv[3]);
        }
        // 启动服务
        $this->handelWorkmanServer();
    }

    private function handelWorkmanServer()
    {
        try {
            $config = $this->getMqttConfig();
            $worker = new Worker();
            $worker->onWorkerStart = function () use ($config) {
                $this->startMqttClient($config);
                $this->startHealthCheck();
            };
            $worker->onWorkerStop=function(){

            };
            $worker->onClose=function($aa){

                Log::error("监听到设备短信");
                Log::error($aa);

            };
            // 启动文件监控
            if (!empty($config['watch_file_path']) && $config['watch_file_change']) {
                $this->watchFiles($config['watch_file_path']);
            }

            Worker::runAll();
        } catch (\Exception $e) {
            Log::error("Workerman服务启动失败: " . $e->getMessage());
            throw $e;
        }
    }

    private function getMqttConfig()
    {
        $config = Config::get('mqtt');
        if (empty($config['server']) || empty($config['port'])) {
            throw new \RuntimeException('MQTT配置不完整');
        }
        return $config;
    }

    private function startMqttClient($config)
    {
        static $mqtt = null;
        $options = [
            'clean_session' => true,
            'username' => $config['username'],
            'password' => $config['password'],
            'keepalive' => 30,
            'protocol_name' => 'MQTT',
            'protocol_level' => 4,
            'client_id' => $config['client_id'],
            'debug' => true,
            'will' => [
                'topic' => 'server_fault',
                'content' => '服务器故障',
                'qos' => 0,
                'retain' => 1
            ],
        ];

        $mqtt = new \Workerman\Mqtt\Client("mqtt://{$config['server']}:{$config['port']}", $options);

        $mqtt->onConnect = function ($mqtt) {
            try {
                //设备主题
                $mqtt->subscribe('ZHZN/#');
                Log::info("MQTT连接成功并订阅主题");
            } catch (\Exception $e) {
                Log::error("MQTT订阅失败: " . $e->getMessage());
                $this->status['error_count']++;
            }
        };

        $mqtt->onMessage = function ($topic, $content) {
            $this->status['last_message_time'] = date('Y-m-d H:i:s');
            $this->status['message_count']++;
            $this->handelMessage($topic, $content);
        };

        $mqtt->onError = function (\Exception $exception = null) use ($config) {
            Log::error("MQTT错误: " . $exception->getMessage());
            $this->status['error_count']++;
            $this->reconnect($config['reconnect_interval']);
        };

        $mqtt->onClose = function () use ($config) {
            Log::warning("MQTT连接关闭，尝试重连...");
            $this->reconnect($config['reconnect_interval']);
        };

        $mqtt->connect();
    }

    private function handelMessage($topic, $message)
    {
        Log::error("收到消息：" . $topic);
        Log::error([
            'topic' => $topic,
            'message' => $message,
            'timestamp' => date('Y-m-d H:i:s')
        ]);
        try {
            $data = Analysis::analysis($message);
            $code = $data['code'];
            $job_data = [
                'message' => $data,
                'topic' => $topic,
            ];
            if ($code == Analysis::CODE_YINGDAWANGLUO || $code == Analysis::CODE_SHANGXIAN || $code == Analysis::CODE_LIXIAN ){//几个与订单流程没有关系的指令 放到一个队列里面  || $code == Analysis::CODE_XINTIAO
                $queue_default = config('queue.default_device');
                Queue::push("app\common\job\Device@deviceMessage", $job_data, $queue_default);
            }else{//与订单相关的指令 放到一个队列里面
                if($code != Analysis::CODE_XINTIAO){
                    $queue_default = config('queue.default');
                    Queue::push("app\common\job\Device@deviceMessage", $job_data, $queue_default);
                }else{
                    Logic::index($topic,$data);
                }
            }
            //Logic::index($topic,$data);

        } catch (\Exception $e) {
            Log::error("消息处理失败: " . $e->getMessage());
            $this->status['error_count']++;
        }
    }

    private function reconnect($interval)
    {
        Timer::add($interval, function () {
            Log::info("尝试重新连接MQTT服务器");
            $this->startMqttClient(Config::get('mqtt'));
        }, null, false);
    }

    private function watchFiles($monitor_dir)
    {
        $worker = new Worker();
        $worker->name = 'Yoga_FileMonitor';
        $worker->reloadable = false;

        $last_mtime = time();
        $file_cache = [];

        $worker->onWorkerStart = function () use ($monitor_dir, &$last_mtime, &$file_cache) {
            if (!Worker::$daemonize) {
                Timer::add(5, function () use ($monitor_dir, &$last_mtime, &$file_cache) {
                    $this->checkFilesChange($monitor_dir, $last_mtime, $file_cache);
                });
            }
        };
    }

    private function checkFilesChange($monitor_dir, &$last_mtime, &$file_cache)
    {
        clearstatcache();
        $dir_iterator = new \RecursiveDirectoryIterator($monitor_dir);
        $iterator = new \RecursiveIteratorIterator($dir_iterator);

        foreach ($iterator as $file) {
            if (pathinfo($file, PATHINFO_EXTENSION) != 'php') {
                continue;
            }

            $file_path = $file->getPathname();
            $current_mtime = $file->getMTime();

            if (!isset($file_cache[$file_path]) || $file_cache[$file_path] != $current_mtime) {
                if ($last_mtime < $current_mtime) {
                    Log::info("文件变更检测到: {$file_path}");
                    posix_kill(posix_getppid(), SIGUSR1);
                    $last_mtime = $current_mtime;
                    $file_cache[$file_path] = $current_mtime;
                    break;
                }
            }
        }
    }

    private function startHealthCheck()
    {
        Timer::add(600, function () {
            $newData=$this->status;
            $newData['memory_usage']=memory_get_usage(true) / 1024 / 1024 . 'MB';
            $this->showMqttStatus($newData);
//            Log::error("瑜伽小程序Mqtt服务状态检查");
//            Log::error([
//                'status' => $this->status,
//                'memory_usage' => memory_get_usage(true) / 1024 / 1024 . 'MB'
//            ]);
        });
    }

    /**
     * 检查设备是否断线，断线则更新设备状态为离线
     * @return void
     * @throws \RedisException
     */
    private function checkAccessControlEquipmentOnClose()
    {
        $key = config('redis.check_equipment_status_key');
        //服务启动时清空 设备信息
        $redisClient = PredisLib::getRedisInstance();
        $redisClient->select(config('queue.select'));
        $redisClient->del($key);
        $redisClient->close();
        unset($redisClient);

        Timer::add(300, function () use ($key) {
            $redisClient = PredisLib::getRedisInstance();
            $redisClient->select(config('queue.select'));
            $equipmentList = $redisClient->hGetAll($key);
            if ($equipmentList) {
                $updateIds = [];
                foreach ($equipmentList as $facesluiceId => $heartbeatTime) {
                    if (time() - $heartbeatTime >= 120) {
                        array_push($updateIds, $facesluiceId);
                        //从redis中移除门禁信息
                        $redisClient->hDel($key, $facesluiceId);
                    }
                }
                if ($updateIds) {
                    //更新设备离线
                    YogaAccessControlEquipment::update
                    (
                        ['online_status' => YogaAccessControlEquipment::ONLINE_STATUS_OFFLINE],
                        ['identifier' => ['in', $updateIds]]
                    );
                }
            }
            $redisClient->close();
            unset($redisClient);
        });
    }


    /**
     * 展示MQTT服务状态
     * @param $data
     * @return void
     */
    private function showMqttStatus($data)
    {
        // 构建带边框的表格
        $border = "+----------------------+----------------------+";
        $table = "\n" . "+-----------------MQTT服务状态------------------+" . "\n";
        foreach ([
                     '开始时间'        => $data['start_time'],
                     '最后消息时间'    => $data['last_message_time'],
                     '消息数量'        => $data['message_count'],
                     '错误数量'        => $data['error_count'],
                     '内存使用'        => $data['memory_usage'],
                 ] as $label => $value) {
            $table .= sprintf("| %-20s | %-20s |\n", $label, $value);
            $table .= $border . "\n";
        }
        Log::error($table);
    }



}